{%- set mhosts=[] -%}
{%- for host in groups.kafka -%}
{{ mhosts.append(hostvars[host]['ansible_host']+':9092') }}
{%- endfor -%}
input {
    kafka{
        bootstrap_servers => "{{ mhosts|join(',') }}"
        topics => "kafka-topic"
        consumer_threads => 1
        decorate_events => true
        codec => "json"
        auto_offset_reset => "latest"
    }
}

filter {
    mutate {
        gsub => ["message", "\\x", "\\\x"]
        convert => ["status","integer"]
        convert => ["bytes_sent","integer"]
        convert => ["upstream_response_length","integer"]
        convert => ["request_time","float"]
        convert => ["upstream_response_time","float"]
    }
    json {
        source => "message"
        remove_field => [ "message" ]
        remove_field => [ "beat" ]
        remove_field => [ "kafka" ]
    }
}

{% set nhosts=[] %}
{%- if groups.elasticMaster -%}
{%- for host in groups.elasticMaster -%}
{{ nhosts.append(hostvars[host]['ansible_host']) }}
{%- endfor -%}
{%- elif groups.elasticsearch -%}
{%- for host in groups.elasticsearch -%}
{{ nhosts.append('"'+hostvars[host]['ansible_host']+':9200'+'"') }}
{%- endfor -%}
{%- endif -%}
output {
    if [fields][log_topic] == "kafka-topic" {
        elasticsearch {
            hosts => [{{ nhosts|join(',') }}]
            manage_template => false
            action => "index"
            index => "kafka-topic-%{+YYYY.MM.dd}"
        }
        #stdout{}
    }
}